package com.bawei.persona.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 * 将用户画像的按多维度的统计指标数据输出到clickhouse中去
 *
 */
public class RichClickHSPaymentProtalFunction extends RichSinkFunction<JSONObject> {
    Connection connection = null;
    PreparedStatement  ps = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        String driver = "ru.yandex.clickhouse.ClickHouseDriver" ;
        String  url = "jdbc:clickhouse://hadoop102:8123/default" ;
        String  username = "default" ;
        String  password = "" ;
        try {
            Class.forName(driver);
            connection = DriverManager.getConnection(url, username, password);
            ps = connection.prepareStatement(" insert into protrait_payment values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
        }catch (Exception e ){

        }
    }

    @Override
    public void close() throws Exception {

        try {

            if(ps!=null){
                ps.close();
            }
            if(connection!=null){
                connection.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        //特别注意，在调试过程钟发现时间长度与hbaee clickhouse 不一样，窃取保持相同
        ps.setString(1, value.getString("category1_id") );
        ps.setString(2, value.getString("category2_id") );
        ps.setString(3, value.getString("category3_id"));
        ps.setString(4, value.getString("category1_name"));
        ps.setString(5, value.getString("category2_name"));
        ps.setString(6, value.getString("category3_name"));
        ps.setString(7, value.getString("carrier"));
        ps.setString(8, value.getString("carriername"));
        ps.setString(9, value.getString("user_gendertype"));
        ps.setString(10, value.getString("user_gender"));
        ps.setString(11, value.getString("province_id"));
        ps.setString(12, value.getString("province_name"));
        ps.setString(13, value.getString("tm_name"));
        ps.setString(14, value.getString("email"));
        ps.setString(15, value.getString("emailtype"));
        ps.setString(16, value.getString("user_age"));
        ps.setString(17, value.getString("yearbasetype"));
        ps.setString(18, value.getString("yearbasename"));
        ps.setString(19, value.getString("payment_create_time"));
        ps.setLong(20, value.getLong("user_id")== null ?0L:value.getLong("user_id"));
        ps.setLong(21, value.getLong("order_id")== null ?0L:value.getLong("order_id"));
        ps.setBigDecimal(22, value.getBigDecimal("order_price")==null? new BigDecimal(0.0) :value.getBigDecimal("order_price"));

        ps.setLong(23, value.getLong("sku_num")== null ?0L:value.getLong("sku_num"));
        ps.setBigDecimal(24, value.getBigDecimal("coupon_reduce_amount")==null? new BigDecimal(0.0) :value.getBigDecimal("coupon_reduce_amount"));
        ps.setBigDecimal(25, value.getBigDecimal("total_amount")==null? new BigDecimal(0.0) :value.getBigDecimal("total_amount"));
        ps.setBigDecimal(26, value.getBigDecimal("activity_reduce_amount")==null? new BigDecimal(0.0) :value.getBigDecimal("activity_reduce_amount"));
        ps.setBigDecimal(27, value.getBigDecimal("payment_total_amount")==null? new BigDecimal(0.0) :value.getBigDecimal("payment_total_amount"));
        ps.executeUpdate();

    }
}
